Kafka Connect: Surface commit failures instead of silently swallowing them#16237
Kafka Connect: Surface commit failures instead of silently swallowing them#16237yadavay-amzn wants to merge 3 commits into
Conversation
Baunsgaard
left a comment
There was a problem hiding this comment.
Good catch and cleanup.
However, the error logging strategy you are proposing seems to be double-logging every commit failure in CoordinatorThread.run(). I have left some specific suggestions.
dd4620e to
8467e0d
Compare
|
Thanks @Baunsgaard for taking a look, you're right about the double-logging. |
Baunsgaard
left a comment
There was a problem hiding this comment.
LGTM, left one nit for production code. Tests looks fine!
8467e0d to
97cdeb1
Compare
|
Done, removed the comment block. |
laskoviymishka
left a comment
There was a problem hiding this comment.
Thanks for tackling #15878, the underlying data-loss bug is real.
Silently swallowing CommitFailedException means Connect can keep the task RUNNING while dropping in-flight data, so the fix direction is right: failures need to surface and move the task to FAILED.
Before merge, I’d resolve a few mismatches between the PR description and the actual diff, since these will be confusing later in git log / blame:
-
“log at ERROR level ... before rethrowing”
The code does not rethrow the original exception. It wraps it in a newRuntimeException(String.format(...), e), soCommitFailedExceptionbecomes a genericRuntimeExceptionwith the original only available asgetCause(). That may lose useful signal for operator alerting / log pattern matching. I’d either rethrowedirectly after logging, or update the description to say “wrap and rethrow with context.” -
“removes the catch-all around
doCommit()”
The catch is not removed; it is narrowed fromcatch (Exception)tocatch (RuntimeException). That may be fine, but the PR should say that explicitly. Otherwise, either keepcatch (Exception)or remove the block entirely and rely on the outer catch inCoordinatorThread.run(). -
“
CoordinatorThread.run()terminates the thread on uncaught exceptions, which transitions the Kafka Connect task toFAILED”
The end state is right, but the mechanism is different.run()catchesException, logs, and setsterminated = true. The Connect task transitions later whenCommitterImpl.save()→processControlEvents()seescoordinatorThread.isTerminated()and throwsNotRunningException. Worth tightening the description so future readers don’t learn the wrong invariant.
One behavioral change I’d like a Kafka Connect domain opinion on: this also makes the partial-commit path — commit(true), triggered by commitState.isCommitTimedOut() — fatal on any RuntimeException. Previously, a transient blip during partial commit was swallowed and retried; now it terminates the coordinator and needs operator intervention.
Is that the intended trade-off, or should the rethrow be gated on !partialCommit?
@AnatolyPopov, can you take a look, would value your read on two things:
- should partial-commit failures be equally fatal?
- does wrapping vs rethrowing the original
CommitFailedExceptionmatter for downstream alerting in your deployments?
Inline comments below cover the test-side observations: spy reuse, brittle post-throw assertions, and missing partial-commit test.
|
Thanks for the thorough review @laskoviymishka. Addressed all points:
Will update the PR description to accurately reflect the narrowed catch and the mechanism for task FAILED transition. |
There was a problem hiding this comment.
Looks good now, all the description/code mismatches are resolved and the partial-commit gate is in.
Small nit: the Testing section still references testCoordinatorWithBadDataFile, but the actually-modified test is testCommitError, worth a quick fix. Plus CI is a bit red.
Approving; will wait for @AnatolyPopov's domain read before merge.
18292d5 to
f2a993a
Compare
|
@laskoviymishka @Baunsgaard Heads up — I made a change after your approvals that I want to flag: The exception handling now uses a tiered approach instead of rethrowing everything:
The reason: rethrowing ALL non- CI is now green. Let me know if you'd prefer a different approach — e.g., a consecutive-failure counter that rethrows after N failures regardless of exception type. |
| .build(); | ||
|
|
||
| coordinatorTest(ImmutableList.of(badDataFile), ImmutableList.of(), null); | ||
| assertThatThrownBy( |
There was a problem hiding this comment.
coordinatorTest() calls coordinator.process() directly, so it skips the real production path.
In prod the flow is:
CoordinatorThread.run() catches the exception → marks terminated = true → next CommitterImpl.save() calls processControlEvents() → throws NotRunningException.
This test doesn’t cover that. It would still pass even if CoordinatorThread swallowed the failure.
I think we need an end-to-end test that goes through CoordinatorThread + CommitterImpl.
|
that's quite a swing @yadavay-amzn, and the new strategy changed enough that I need to flip to request some changes / questions. The new behavior is basically:
I don’t think that’s safe. The previous direction was closer to correct:
Main issues:
I’d rather go back to the old shape: rethrow on full commit failure, swallow only the timed-out partial commit cleanup failure. If we really want retries here, I think we need to explicitly list retryable exception types, bound the retries, and document the buffer lifecycle. Also the PR description drifted from the code again, so that needs an update too. @AnatolyPopov — still interested in your read on the worker/coordinator offset ordering, since that’s the key assumption behind “retry next cycle.” |
f2a993a to
e66f880
Compare
|
@laskoviymishka You're right, the
This matches your original direction (please correct me if I'm wrong). The On the test coverage point (coordinatorTest skipping the production CoordinatorThread path), agreed that an end-to-end test through CoordinatorThread + CommitterImpl would be stronger. Happy to add that in this PR or as a follow-up, whichever you prefer. |
There was a problem hiding this comment.
thanks @yadavay-amzn! We’re back on a good track — back on the road to 🚢.
Three small nits I’d like to land before merge, all 1–3 line fixes.
None of them are blocking on their own, but they’re cheap to do in this PR.
UPD: I see a follow up issue, i'm good with just 3 one-liners before merge.
I strongly consider the end-to-end CoordinatorThread + CommitterImpl test needed here, but it may be a bigger change.
| taskId, | ||
| commitState.currentCommitId(), | ||
| e); | ||
| } catch (RuntimeException e) { |
There was a problem hiding this comment.
Two small things on this catch block:
SLF4J double-printing the message. Both LOG calls pass e.getMessage() as a {} arg and e as the trailing throwable. SLF4J already prints the exception's message (and full stack) when the last arg is a Throwable, so the message ends up rendered twice in every log line.
taskId asymmetry. It's in the ERROR but dropped from the WARN. In a multi-task cluster you'd lose task identity for retried partial commits.
Both fixed at once:
LOG.warn("Partial commit {} failed for task {}, will retry",
commitState.currentCommitId(), taskId, e);
...
LOG.error("Commit {} failed for task {}",
commitState.currentCommitId(), taskId, e);| assertThatThrownBy( | ||
| () -> coordinatorTest(ImmutableList.of(badDataFile), ImmutableList.of(), null)) | ||
| .isInstanceOf(IllegalArgumentException.class) | ||
| .hasMessageContaining("Cannot find partition spec"); |
There was a problem hiding this comment.
the prev-prev version of this test also asserted producer.history().hasSize(1) (only the StartCommit event was sent, no CommitToTable) and table.snapshots().isEmpty() (no phantom commit landed). v4 drops both.
The test now only proves an exception is thrown, it would still pass if a future regression sent a CommitToTable event before failing, or somehow committed-then-threw.
Maybe keep side-effect guards:
assertThatThrownBy(...)
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Cannot find partition spec");
assertThat(producer.history()).hasSize(1);
assertThat(table.snapshots()).isEmpty();| ImmutableList.of(), | ||
| EventTestUtil.now())) | ||
| .isInstanceOf(ValidationException.class) | ||
| .hasMessageContaining("stale offsets"); |
There was a problem hiding this comment.
Same shape as the comment above, stale-offset conflict the table was not mutated (snapshots().hasSize(2) and offset still {"0":7}).
Those are the actual correctness guarantees of the optimistic-concurrency guard, and they're no longer checked. The exception-type assertion alone would pass even if a future regression committed the row delta before throwing.
Lets keep the post-throw assertions:
assertThatThrownBy(...)
.isInstanceOf(ValidationException.class)
.hasMessageContaining("stale offsets");
table.refresh();
assertThat(table.snapshots()).hasSize(2);
assertThat(table.currentSnapshot().summary())
.containsEntry(OFFSETS_SNAPSHOT_PROP, "{\"0\":7}");… them Narrow the catch around doCommit() and rethrow on full-commit failures. Partial-commit failures (triggered by commit timeout) are logged at WARN and swallowed since the coordinator will retry on the next cycle. This ensures commit failures surface to operators by terminating the coordinator thread, which transitions the Connect task to FAILED. Fixes apache#15878
e66f880 to
6e3325f
Compare
|
Addressed all three nits:
Thanks @laskoviymishka! |
laskoviymishka
left a comment
There was a problem hiding this comment.
Nice, all clean, nits addressed plus no unexpected scope changes :)
LGTM from my side, will wait for a Kafka Connect deep-dive from @AnatolyPopov before merge.
AnatolyPopov
left a comment
There was a problem hiding this comment.
Hi all, and thanks @laskoviymishka for pinging me on this.
Overall LGTM with some comments.
First of all I do not think partial commits need to be fatal but I suspect they can become dominant on overloaded Connect clusters. I think this is out of scope of this PR but as an operator I might want to see a metric for those and this can be done as a follow up if needed.
The exception wrapping does not really matter as well, what matters usually is the task status.
I would also consider to retry on some specific exceptions to avoid the need for operator intervention in case of task failure on transitive errors as was discussed before but in any case this is strictly better then we have now and retries could be considered as a follow up.
On the ordering — I think it works for what this PR is fixing. Workers commit source offsets atomically with the DataWritten event in a single producer transaction, and the coordinator only advances the control-topic offset on a successful commit. The two ways this can break are control-topic retention shorter than the recovery window, and orphan-file cleanup running before recovery AFAIU. Both are operator side knobs rather than connector behavior, so I'd consider them as misconfiguration, but maybe worth a doc note to make operators aware.
Thanks @yadavay-amzn for working through this.
|
Thanks @AnatolyPopov for the thorough review and the context on the offset ordering! And thanks @laskoviymishka for driving this to a good shape. Agreed on the follow-ups, I will create issues to track:
Both are additive and can land independently. |
Fixes #15878.
Problem
The Kafka Connect
Coordinatorpreviously caughtExceptionarounddoCommit()and only logged a warning, so when a commit failed (e.g.,CommitFailedExceptionfrom Glue detecting a concurrent table update), the connector stayedRUNNINGwhile silently dropping the data that was in flight.Fix
Rethrow all exceptions from
doCommit()on full-commit failures. Partial-commit failures (triggered bycommitState.isCommitTimedOut()) are logged at WARN and swallowed since the coordinator will retry on the next cycle.This ensures commit failures surface to operators by terminating the coordinator thread, which transitions the Connect task to FAILED via
CommitterImpl.processControlEvents()detectingcoordinatorThread.isTerminated().The
finallyblock that callscommitState.endCurrentCommit()is preserved so per-commit state is cleaned up regardless of the outcome.Testing
testCommitFailedExceptionPropagates: verifies thatCommitFailedExceptionpropagates and kills the coordinator on full commit.testCommitError: verifies thatIllegalArgumentException(bad partition spec) propagates.testCoordinatorCommittedOffsetValidation: verifies thatValidationException(stale offsets) propagates.TestCoordinatorsuite + Kafka Connect integration tests pass locally (10/10 runs).spotlessCheck+checkstyleMain+checkstyleTestpass.